{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# GeoWave Spatial Join Demo\n",
    "This demo runs a distance join using an GPX dataset for Germany and the GDELT dataset. We use this demo to run a distance join using our tiered join algorithm on two large datasets to get what GPX points are within a certain distance to GDELT events.\n",
    "\n",
    "To run this join on Spark using a naive Spark SQL query would take 20+ hours to possibly get a result. With this algorithm and GeoWaves tiered indexing strategy we can complete the same join in 2-5 hours depending on the cluster size and configuration. This algorithm is not the answer to every join situation however, for smaller dataset sizes that can fit into memory you are performing extra work by running this join in its current implementation. For those datasets using native Spark joins are still a better option.\n",
    "\n",
    "The current implementation of this algorithm considers the worst case scenario for each dataset. This will be improved upon quickly over the next updates and releases. Currently, the algorithm will dynamically index each set even when the underlying indexing method for each rdd is the same. This requires a touch of all records in the dataset which can be avoided for a majority of joins where the indexing methods are the same between both sets.  \n",
    "\n",
    "Simply focus a cell and use [SHIFT + ENTER] to run the code."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Import pixiedust\n",
    "Start by importing pixiedust which if all bootstrap and install steps were run correctly.\n",
    "You should see below for opening the pixiedust database successfully with no errors.\n",
    "Depending on the version of pixiedust that gets installed, it may ask you to update.\n",
    "If so, run this first cell."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#!pip install --user --upgrade pixiedust"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Stop old session\n",
    "spark.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Picking the right parallelism\n",
    "It's important to pick a high enough parallelism to partition the data into small enough chunks to support the join. Relying on the default set by Spark for the cluster size when working with a extremely large set of data is recipe for OOM errors on the executor. \n",
    "\n",
    "If you're having trouble finding the right parallelism try looking at the Spark history server and checking what your largest partition size is. Aim for a max partition size of ~64MB preferably smaller. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Create new session with adequate parallelism\n",
    "spark = SparkSession.builder\\\n",
    ".config('spark.serializer','org.apache.spark.serializer.KryoSerializer')\\\n",
    ".config('spark.kryo.registrator', 'org.locationtech.geowave.analytic.spark.GeoWaveRegistrator')\\\n",
    ".config('spark.default.parallelism', '6000')\\\n",
    ".getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(spark.__dict__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = spark.sparkContext\n",
    "import pixiedust\n",
    "import geowave_pyspark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pixiedust.enableJobMonitor()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Print Spark info and create sql_context\n",
    "print('Spark Version: {0}'.format(sc.version))\n",
    "print('Python Version: {0}'.format(sc.pythonVer))\n",
    "print('Application Name: {0}'.format(sc.appName))\n",
    "print('Application ID: {0}'.format(sc.applicationId))\n",
    "print('Spark Master: {0}'.format( sc.master))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download and ingest the GPX data\n",
    "*NOTE* Depending on cluster size sometimes the copy can fail. This appears to be a race condition error with the copy command when downloading the files from s3. This may make the following import into acccumulo command fail. You can check the accumulo tables by looking at port 9995 of the emr cluster. There should be 5 tables after importing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "s3-dist-cp -D mapreduce.task.timeout=60000000 --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/ "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "/opt/accumulo/bin/accumulo shell -u root -p secret -e \"importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial\"\n",
    "/opt/accumulo/bin/accumulo shell -u root -p secret -e \"importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# configure geowave connection params for store\n",
    "geowave store add germany_gpx --gwNamespace geowave.germany_gpx -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download GDELT Data\n",
    "Download the gdelt data necessary to perform the join. You can either download the quickstart events which ends around ~120k features, or you can download all events from 2010 onward which is close to ~500k+ features.\n",
    "If you want the larger dataset run the cell below, but replace \"TIME_REGEX\" with \"LARGER_TIME_REGEX\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "cd /mnt/tmp\n",
    "wget s3.amazonaws.com/geowave/latest/scripts/emr/quickstart/geowave-env.sh\n",
    "source /mnt/tmp/geowave-env.sh\n",
    "\n",
    "#setup a larger regex for every event after 2010\n",
    "export LARGER_TIME_REGEX=201\n",
    "\n",
    "mkdir gdelt\n",
    "cd gdelt\n",
    "wget http://data.gdeltproject.org/events/md5sums\n",
    "for file in `cat md5sums | cut -d' ' -f3 | grep \"^${TIME_REGEX}\"` ; \\\n",
    "do wget http://data.gdeltproject.org/events/$file ; done\n",
    "md5sum -c md5sums 2>&1 | grep \"^${TIME_REGEX}\"\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Ingest GDELT Data\n",
    "Depending on how many events were downloaded above this step could take anywhere from 10 minutes to hours. The CQL filter only ingests a small portion of the events over Europe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# We have to source here again because bash runs in a separate sub process each cell.\n",
    "source /mnt/tmp/geowave-env.sh\n",
    "\n",
    "# clear old potential runs\n",
    "geowave store clear gdelt\n",
    "geowave store rm gdelt\n",
    "\n",
    "# configure geowave connection params for accumulo stores \"gdelt\"\n",
    "geowave store add gdelt --gwNamespace geowave.gdelt -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181\n",
    "\n",
    "# configure a spatial index\n",
    "geowave index add gdelt gdeltspatial -t spatial --partitionStrategy round_robin --numPartitions $NUM_PARTITIONS\n",
    "\n",
    "# run the ingest for a 10x10 deg bounding box over Europe\n",
    "geowave ingest localtogw /mnt/tmp/gdelt gdelt gdeltspatial -f gdelt \\\n",
    "--gdelt.cql \"BBOX(geometry, 0, 50, 10, 60)\"\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#grab classes from jvm\n",
    "hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions\n",
    "accumulo_options_class = sc._jvm.org.locationtech.geowave.datastore.accumulo.cli.config.AccumuloRequiredOptions\n",
    "\n",
    "query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions\n",
    "geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD\n",
    "indexed_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveIndexedRDD\n",
    "rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader\n",
    "rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions\n",
    "sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame\n",
    "byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId\n",
    "\n",
    "#grab classes for spatial join\n",
    "join_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner\n",
    "index_builder_class = sc._jvm.org.locationtech.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider.SpatialIndexBuilder\n",
    "geom_intersects_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomIntersects\n",
    "geom_distance_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomWithinDistance\n",
    "\n",
    "udf_registry_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomFunctionRegistry\n",
    "\n",
    "feature_data_adapter_class = sc._jvm.org.locationtech.geowave.adapter.vector.FeatureDataAdapter\n",
    "feature_data_utils = sc._jvm.org.locationtech.geowave.adapter.vector.util.FeatureDataUtils\n",
    "sft_builder_class = sc._jvm.org.geotools.feature.simple.SimpleFeatureTypeBuilder\n",
    "\n",
    "datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils\n",
    "\n",
    "udf_registry_class.registerGeometryFunctions(spark._jsparkSession)\n",
    "\n",
    "spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders\n",
    "\n",
    "spatial_encoders_class.registerUDTs()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "#setup input datastore\n",
    "gpx_store = accumulo_options_class()\n",
    "gpx_store.setInstance('accumulo')\n",
    "gpx_store.setUser('root')\n",
    "gpx_store.setPassword('secret')\n",
    "gpx_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')\n",
    "gpx_store.setGeowaveNamespace('geowave.germany_gpx')\n",
    "\n",
    "#Setup osm datastore\n",
    "gdelt_store = accumulo_options_class()\n",
    "gdelt_store.setInstance('accumulo')\n",
    "gdelt_store.setUser('root')\n",
    "gdelt_store.setPassword('secret')\n",
    "gdelt_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')\n",
    "gdelt_store.setGeowaveNamespace('geowave.gdelt')\n",
    "\n",
    "#Setup output store\n",
    "output_store = accumulo_options_class()\n",
    "output_store.setInstance('accumulo')\n",
    "output_store.setUser('root')\n",
    "output_store.setPassword('secret')\n",
    "output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')\n",
    "output_store.setGeowaveNamespace('geowave.joined')\n",
    "\n",
    "gpx_store_plugin = gpx_store.createPluginOptions()\n",
    "gdelt_store_plugin = gdelt_store.createPluginOptions()\n",
    "output_store_plugin = output_store.createPluginOptions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#loading RDDs and setting up variables for join\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create SpatialJoinRunner object\n",
    "\n",
    "# You have to pass the wrapped java SparkSession object to java functions\n",
    "join_runner = join_runner_class(spark._jsparkSession)\n",
    "\n",
    "# Set data for left side rdd in join\n",
    "join_runner.setLeftStore(gpx_store_plugin)\n",
    "gpx_point = byte_array_class('gpxpoint')\n",
    "join_runner.setLeftAdapterId(gpx_point)\n",
    "\n",
    "# Set data for right side rdd in join\n",
    "join_runner.setRightStore(gdelt_store_plugin)\n",
    "gdelt_event = byte_array_class('gdeltevent')\n",
    "join_runner.setRightAdapterId(gdelt_event)\n",
    "\n",
    "# Set data for output store\n",
    "join_runner.setOutputStore(output_store_plugin)\n",
    "join_runner.setOutputLeftAdapterId(byte_array_class('gpxJoin'))\n",
    "join_runner.setOutputRightAdapterId(byte_array_class('gdeltJoin'))\n",
    "\n",
    "# Set predicate method for join\n",
    "distance_predicate = geom_distance_class(0.01)\n",
    "join_runner.setPredicate(distance_predicate)\n",
    "\n",
    "# Set default partition count for spark objects\n",
    "join_runner.setPartCount(6000)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run the spatial join\n",
    "Execute the cell below to run the spatial join. This will compare 285 million gpx points against ~100k-~500k gdelt events. The smallest run case takes anywhere from 2-5 hours depending on dataset and cluster size. The work is split into 3 jobs, the first two determining which tiers contain data and the last performing the join between tiers.\n",
    "\n",
    "This would be the equivalent of running the following sql command from the sql_context:\n",
    "\n",
    "\"select gpx.\\*, gdelt.\\* from gpx, gdelt where geomDistance(gpx.geom,gdelt.geom) <= 0.01\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "join_runner.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Map of join results\n",
    "Once we have geoserver layers of our join results we can use folium to add the wms layers, and display the results on a map."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "geowave store add gpx_joined --gwNamespace geowave.joined -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181\n",
    "\n",
    "# set up geoserver\n",
    "geowave config geoserver \"$HOSTNAME:8000\"\n",
    "\n",
    "# add the gpx join results layer\n",
    "geowave gs layer add gpx_joined -id gdeltJoin\n",
    "geowave gs style set gdeltJoin --styleName geowave:blue\n",
    "\n",
    "# add the gdelt join results layer\n",
    "geowave gs layer add gpx_joined -id gpxJoin\n",
    "geowave gs style set gpxJoin --styleName point"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import owslib\n",
    "from owslib.wms import WebMapService\n",
    "\n",
    "url = \"http://\" + os.environ['HOSTNAME'] + \":8000/geoserver/geowave/wms\"\n",
    "web_map_services = WebMapService(url)\n",
    "\n",
    "#print layers available wms\n",
    "print('\\n'.join(web_map_services.contents.keys()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pixiedust": {
     "displayParams": {
      "handlerId": "tableView"
     }
    }
   },
   "outputs": [],
   "source": [
    "import folium\n",
    "from folium import Map\n",
    "\n",
    "#grab wms info for centroids\n",
    "layer = 'gdeltJoin'\n",
    "wms = web_map_services.contents[layer]\n",
    "\n",
    "#build center of map off centroid bbox\n",
    "lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.\n",
    "lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.\n",
    "center = [lat, lon]\n",
    "\n",
    "m = Map(location = center,zoom_start=10)\n",
    "\n",
    "\n",
    "name = wms.title\n",
    "gdelt = folium.raster_layers.WmsTileLayer(\n",
    "    url=url,\n",
    "    name=name,\n",
    "    fmt='image/png',\n",
    "    transparent=True,\n",
    "    layers=layer,\n",
    "    overlay=True,\n",
    "    COLORSCALERANGE='1.2,28',\n",
    ")\n",
    "gdelt.add_to(m)\n",
    "\n",
    "layer = 'gpxJoin'\n",
    "wms = web_map_services.contents[layer]\n",
    "\n",
    "name = wms.title\n",
    "gpx = folium.raster_layers.WmsTileLayer(\n",
    "    url=url,\n",
    "    name=name,\n",
    "    fmt='image/png',\n",
    "    transparent=True,\n",
    "    layers=layer,\n",
    "    overlay=True,\n",
    "    COLORSCALERANGE='1.2,28',\n",
    ")\n",
    "gpx.add_to(m)\n",
    "\n",
    "folium.LayerControl().add_to(m)\n",
    "m"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python with Pixiedust (Spark 2.3)",
   "language": "python",
   "name": "pythonwithpixiedustspark23"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
